// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef _ALL_SOURCE
#define _ALL_SOURCE  // Enables thrd_create_with_name in <threads.h>.
#endif
#include <assert.h>
#include <lib/async-loop/loop.h>
#include <lib/async/default.h>
#include <lib/async/receiver.h>
#include <lib/async/task.h>
#include <lib/async/trap.h>
#include <lib/async/wait.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <zircon/assert.h>
#include <zircon/listnode.h>
#include <zircon/syscalls.h>
#include <zircon/syscalls/hypervisor.h>

// The port wait key associated with the dispatcher's control messages.
#define KEY_CONTROL (0u)

static zx_time_t async_loop_now(async_dispatcher_t* dispatcher);
static zx_status_t async_loop_begin_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
static zx_status_t async_loop_cancel_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
static zx_status_t async_loop_post_task(async_dispatcher_t* dispatcher, async_task_t* task);
static zx_status_t async_loop_cancel_task(async_dispatcher_t* dispatcher, async_task_t* task);
static zx_status_t async_loop_queue_packet(async_dispatcher_t* dispatcher,
                                           async_receiver_t* receiver,
                                           const zx_packet_user_t* data);
static zx_status_t async_loop_set_guest_bell_trap(async_dispatcher_t* dispatcher,
                                                  async_guest_bell_trap_t* trap, zx_handle_t guest,
                                                  zx_vaddr_t addr, size_t length);

static const async_ops_t async_loop_ops = {
    .version = ASYNC_OPS_V1,
    .reserved = 0,
    .v1 = {
        .now = async_loop_now,
        .begin_wait = async_loop_begin_wait,
        .cancel_wait = async_loop_cancel_wait,
        .post_task = async_loop_post_task,
        .cancel_task = async_loop_cancel_task,
        .queue_packet = async_loop_queue_packet,
        .set_guest_bell_trap = async_loop_set_guest_bell_trap,
    }};

typedef struct thread_record {
  list_node_t node;
  thrd_t thread;
} thread_record_t;

const async_loop_config_t kAsyncLoopConfigAttachToThread = {
    .make_default_for_current_thread = true,
    .default_accessors = {
        .getter = async_get_default_dispatcher,
        .setter = async_set_default_dispatcher,
    }};
const async_loop_config_t kAsyncLoopConfigNoAttachToThread = {
    .make_default_for_current_thread = false,
    .default_accessors = {
        .getter = async_get_default_dispatcher,
        .setter = async_set_default_dispatcher,
    }};
const async_loop_config_t kAsyncLoopConfigNeverAttachToThread = {
    .make_default_for_current_thread = false,
    .default_accessors = {.getter = NULL, .setter = NULL}};

typedef struct async_loop {
  async_dispatcher_t dispatcher;  // must be first (the loop inherits from async_dispatcher_t)
  async_loop_config_t config;     // immutable
  zx_handle_t port;               // immutable
  zx_handle_t timer;              // immutable

  _Atomic async_loop_state_t state;
  atomic_uint active_threads;  // number of active dispatch threads

  mtx_t lock;               // guards the lists and the dispatching tasks flag
  bool dispatching_tasks;   // true while the loop is busy dispatching tasks
  list_node_t wait_list;    // most recently added first
  list_node_t task_list;    // pending tasks, earliest deadline first
  list_node_t due_list;     // due tasks, earliest deadline first
  list_node_t thread_list;  // earliest created thread first
  bool timer_armed;         // true if timer has been set and has not fired yet
} async_loop_t;

static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline);
static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
                                            zx_status_t status, const zx_packet_signal_t* signal);
static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop);
static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task, zx_status_t status);
static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
                                              zx_status_t status, const zx_packet_user_t* data);
static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
                                                       async_guest_bell_trap_t* trap,
                                                       zx_status_t status,
                                                       const zx_packet_guest_bell_t* bell);
static void async_loop_wake_threads(async_loop_t* loop);
static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task);
static void async_loop_restart_timer_locked(async_loop_t* loop);
static void async_loop_invoke_prologue(async_loop_t* loop);
static void async_loop_invoke_epilogue(async_loop_t* loop);

static_assert(sizeof(list_node_t) <= sizeof(async_state_t), "async_state_t too small");

#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))

static inline list_node_t* wait_to_node(async_wait_t* wait) { return TO_NODE(async_wait_t, wait); }

static inline async_wait_t* node_to_wait(list_node_t* node) {
  return FROM_NODE(async_wait_t, node);
}

static inline list_node_t* task_to_node(async_task_t* task) { return TO_NODE(async_task_t, task); }

static inline async_task_t* node_to_task(list_node_t* node) {
  return FROM_NODE(async_task_t, node);
}

zx_status_t async_loop_create(const async_loop_config_t* config, async_loop_t** out_loop) {
  ZX_DEBUG_ASSERT(out_loop);
  ZX_DEBUG_ASSERT(config != NULL);
  // If a setter was given, a getter should have been, too.
  ZX_ASSERT((config->default_accessors.setter != NULL) ==
            (config->default_accessors.getter != NULL));

  async_loop_t* loop = calloc(1u, sizeof(async_loop_t));
  if (!loop)
    return ZX_ERR_NO_MEMORY;
  atomic_init(&loop->state, ASYNC_LOOP_RUNNABLE);
  atomic_init(&loop->active_threads, 0u);

  loop->dispatcher.ops = &async_loop_ops;
  loop->config = *config;
  if (config->make_default_for_current_thread && config->default_accessors.setter == NULL) {
    loop->config.default_accessors.getter = async_get_default_dispatcher;
    loop->config.default_accessors.setter = async_set_default_dispatcher;
  }
  mtx_init(&loop->lock, mtx_plain);
  list_initialize(&loop->wait_list);
  list_initialize(&loop->task_list);
  list_initialize(&loop->due_list);
  list_initialize(&loop->thread_list);

  zx_status_t status = zx_port_create(0u, &loop->port);
  if (status == ZX_OK)
    status = zx_timer_create(ZX_TIMER_SLACK_LATE, ZX_CLOCK_MONOTONIC, &loop->timer);
  if (status == ZX_OK) {
    *out_loop = loop;
    if (loop->config.make_default_for_current_thread) {
      ZX_DEBUG_ASSERT(loop->config.default_accessors.getter() == NULL);
      loop->config.default_accessors.setter(&loop->dispatcher);
    }
  } else {
    // Adjust this flag so we don't trip an assert trying to clear a default dispatcher we never
    // installed.
    loop->config.make_default_for_current_thread = false;
    async_loop_destroy(loop);
  }
  return status;
}

void async_loop_destroy(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  async_loop_shutdown(loop);

  zx_handle_close(loop->port);
  zx_handle_close(loop->timer);
  mtx_destroy(&loop->lock);
  free(loop);
}

void async_loop_shutdown(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  async_loop_state_t prior_state =
      atomic_exchange_explicit(&loop->state, ASYNC_LOOP_SHUTDOWN, memory_order_acq_rel);
  if (prior_state == ASYNC_LOOP_SHUTDOWN)
    return;

  async_loop_wake_threads(loop);
  async_loop_join_threads(loop);

  list_node_t* node;
  while ((node = list_remove_head(&loop->wait_list))) {
    async_wait_t* wait = node_to_wait(node);
    async_loop_dispatch_wait(loop, wait, ZX_ERR_CANCELED, NULL);
  }
  while ((node = list_remove_head(&loop->due_list))) {
    async_task_t* task = node_to_task(node);
    async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
  }
  while ((node = list_remove_head(&loop->task_list))) {
    async_task_t* task = node_to_task(node);
    async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
  }

  if (loop->config.make_default_for_current_thread) {
    ZX_DEBUG_ASSERT(loop->config.default_accessors.getter() == &loop->dispatcher);
    loop->config.default_accessors.setter(NULL);
  }
}

zx_status_t async_loop_run(async_loop_t* loop, zx_time_t deadline, bool once) {
  ZX_DEBUG_ASSERT(loop);

  zx_status_t status;
  atomic_fetch_add_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
  do {
    status = async_loop_run_once(loop, deadline);
  } while (status == ZX_OK && !once);
  atomic_fetch_sub_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
  return status;
}

zx_status_t async_loop_run_until_idle(async_loop_t* loop) {
  zx_status_t status = async_loop_run(loop, 0, false);
  if (status == ZX_ERR_TIMED_OUT) {
    status = ZX_OK;
  }
  return status;
}

static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline) {
  async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
  if (state == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;
  if (state != ASYNC_LOOP_RUNNABLE)
    return ZX_ERR_CANCELED;

  zx_port_packet_t packet;
  zx_status_t status = zx_port_wait(loop->port, deadline, &packet);
  if (status != ZX_OK)
    return status;

  if (packet.key == KEY_CONTROL) {
    // Handle wake-up packets.
    if (packet.type == ZX_PKT_TYPE_USER)
      return ZX_OK;

    // Handle task timer expirations.
    if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE && packet.signal.observed & ZX_TIMER_SIGNALED) {
      return async_loop_dispatch_tasks(loop);
    }
  } else {
    // Handle wait completion packets.
    if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
      async_wait_t* wait = (void*)(uintptr_t)packet.key;
      mtx_lock(&loop->lock);
      list_delete(wait_to_node(wait));
      mtx_unlock(&loop->lock);
      return async_loop_dispatch_wait(loop, wait, packet.status, &packet.signal);
    }

    // Handle queued user packets.
    if (packet.type == ZX_PKT_TYPE_USER) {
      async_receiver_t* receiver = (void*)(uintptr_t)packet.key;
      return async_loop_dispatch_packet(loop, receiver, packet.status, &packet.user);
    }

    // Handle guest bell trap packets.
    if (packet.type == ZX_PKT_TYPE_GUEST_BELL) {
      async_guest_bell_trap_t* trap = (void*)(uintptr_t)packet.key;
      return async_loop_dispatch_guest_bell_trap(loop, trap, packet.status, &packet.guest_bell);
    }
  }

  ZX_DEBUG_ASSERT(false);
  return ZX_ERR_INTERNAL;
}

async_dispatcher_t* async_loop_get_dispatcher(async_loop_t* loop) {
  // Note: The loop's implementation inherits from async_t so we can upcast to it.
  return (async_dispatcher_t*)loop;
}

async_loop_t* async_loop_from_dispatcher(async_dispatcher_t* async) { return (async_loop_t*)async; }

static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
                                                       async_guest_bell_trap_t* trap,
                                                       zx_status_t status,
                                                       const zx_packet_guest_bell_t* bell) {
  async_loop_invoke_prologue(loop);
  trap->handler((async_dispatcher_t*)loop, trap, status, bell);
  async_loop_invoke_epilogue(loop);
  return ZX_OK;
}

static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
                                            zx_status_t status, const zx_packet_signal_t* signal) {
  async_loop_invoke_prologue(loop);
  wait->handler((async_dispatcher_t*)loop, wait, status, signal);
  async_loop_invoke_epilogue(loop);
  return ZX_OK;
}

static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop) {
  // Dequeue and dispatch one task at a time in case an earlier task wants
  // to cancel a later task which has also come due.  At most one thread
  // can dispatch tasks at any given moment (to preserve serial ordering).
  // Timer restarts are suppressed until we run out of tasks to dispatch.
  mtx_lock(&loop->lock);
  if (!loop->dispatching_tasks) {
    loop->dispatching_tasks = true;

    // Extract all of the tasks that are due into |due_list| for dispatch
    // unless we already have some waiting from a previous iteration which
    // we would like to process in order.
    list_node_t* node;
    if (list_is_empty(&loop->due_list)) {
      zx_time_t due_time = async_loop_now((async_dispatcher_t*)loop);
      list_node_t* tail = NULL;
      list_for_every(&loop->task_list, node) {
        if (node_to_task(node)->deadline > due_time)
          break;
        tail = node;
      }
      if (tail) {
        list_node_t* head = loop->task_list.next;
        loop->task_list.next = tail->next;
        tail->next->prev = &loop->task_list;
        loop->due_list.next = head;
        head->prev = &loop->due_list;
        loop->due_list.prev = tail;
        tail->next = &loop->due_list;
      }
    }

    // Dispatch all due tasks.  Note that they might be canceled concurrently
    // so we need to grab the lock during each iteration to fetch the next
    // item from the list.
    while ((node = list_remove_head(&loop->due_list))) {
      mtx_unlock(&loop->lock);

      // Invoke the handler.  Note that it might destroy itself.
      async_task_t* task = node_to_task(node);
      async_loop_dispatch_task(loop, task, ZX_OK);

      mtx_lock(&loop->lock);
      async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
      if (state != ASYNC_LOOP_RUNNABLE)
        break;
    }

    loop->dispatching_tasks = false;
    loop->timer_armed = false;
    async_loop_restart_timer_locked(loop);
  }
  mtx_unlock(&loop->lock);
  return ZX_OK;
}

static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task, zx_status_t status) {
  // Invoke the handler.  Note that it might destroy itself.
  async_loop_invoke_prologue(loop);
  task->handler((async_dispatcher_t*)loop, task, status);
  async_loop_invoke_epilogue(loop);
}

static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
                                              zx_status_t status, const zx_packet_user_t* data) {
  // Invoke the handler.  Note that it might destroy itself.
  async_loop_invoke_prologue(loop);
  receiver->handler((async_dispatcher_t*)loop, receiver, status, data);
  async_loop_invoke_epilogue(loop);
  return ZX_OK;
}

void async_loop_quit(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  async_loop_state_t expected_state = ASYNC_LOOP_RUNNABLE;
  if (!atomic_compare_exchange_strong_explicit(&loop->state, &expected_state, ASYNC_LOOP_QUIT,
                                               memory_order_acq_rel, memory_order_acquire))
    return;

  async_loop_wake_threads(loop);
}

static void async_loop_wake_threads(async_loop_t* loop) {
  // Queue enough packets to awaken all active threads.
  // This is safe because any new threads which join the pool first increment the
  // active thread count then check the loop state, so the count we observe here
  // cannot be less than the number of threads which might be blocked in |port_wait|.
  // Issuing too many packets is also harmless.
  uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
  for (uint32_t i = 0u; i < n; i++) {
    zx_port_packet_t packet = {.key = KEY_CONTROL, .type = ZX_PKT_TYPE_USER, .status = ZX_OK};
    zx_status_t status = zx_port_queue(loop->port, &packet);
    ZX_ASSERT_MSG(status == ZX_OK, "zx_port_queue: status=%d", status);
  }
}

zx_status_t async_loop_reset_quit(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  // Ensure that there are no active threads before resetting the quit state.
  // This check is inherently racy but not dangerously so.  It's mainly a
  // sanity check for client code so we can make a stronger statement about
  // how |async_loop_reset_quit()| is supposed to be used.
  uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
  if (n != 0)
    return ZX_ERR_BAD_STATE;

  async_loop_state_t expected_state = ASYNC_LOOP_QUIT;
  if (atomic_compare_exchange_strong_explicit(&loop->state, &expected_state, ASYNC_LOOP_RUNNABLE,
                                              memory_order_acq_rel, memory_order_acquire)) {
    return ZX_OK;
  }

  async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
  if (state == ASYNC_LOOP_RUNNABLE)
    return ZX_OK;
  return ZX_ERR_BAD_STATE;
}

async_loop_state_t async_loop_get_state(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  return atomic_load_explicit(&loop->state, memory_order_acquire);
}

zx_time_t async_loop_now(async_dispatcher_t* dispatcher) { return zx_clock_get_monotonic(); }

static zx_status_t async_loop_begin_wait(async_dispatcher_t* async, async_wait_t* wait) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(wait);

  if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;

  mtx_lock(&loop->lock);

  zx_status_t status =
      zx_object_wait_async(wait->object, loop->port, (uintptr_t)wait, wait->trigger, wait->options);
  if (status == ZX_OK) {
    list_add_head(&loop->wait_list, wait_to_node(wait));
  } else {
    ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED, "zx_object_wait_async: status=%d", status);
  }

  mtx_unlock(&loop->lock);
  return status;
}

static zx_status_t async_loop_cancel_wait(async_dispatcher_t* async, async_wait_t* wait) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(wait);

  // Note: We need to process cancellations even while the loop is being
  // destroyed in case the client is counting on the handler not being
  // invoked again past this point.

  mtx_lock(&loop->lock);

  // First, confirm that the wait is actually pending.
  list_node_t* node = wait_to_node(wait);
  if (!list_in_list(node)) {
    mtx_unlock(&loop->lock);
    return ZX_ERR_NOT_FOUND;
  }

  // Next, cancel the wait.  This may be racing with another thread that
  // has read the wait's packet but not yet dispatched it.  So if we fail
  // to cancel then we assume we lost the race.
  zx_status_t status = zx_port_cancel(loop->port, wait->object, (uintptr_t)wait);
  if (status == ZX_OK) {
    list_delete(node);
  } else {
    ZX_ASSERT_MSG(status == ZX_ERR_NOT_FOUND, "zx_port_cancel: status=%d", status);
  }

  mtx_unlock(&loop->lock);
  return status;
}

static zx_status_t async_loop_post_task(async_dispatcher_t* async, async_task_t* task) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(task);

  if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;

  mtx_lock(&loop->lock);

  async_loop_insert_task_locked(loop, task);
  if (!loop->dispatching_tasks && task_to_node(task)->prev == &loop->task_list) {
    // Task inserted at head.  Earliest deadline changed.
    async_loop_restart_timer_locked(loop);
  }

  mtx_unlock(&loop->lock);
  return ZX_OK;
}

static zx_status_t async_loop_cancel_task(async_dispatcher_t* async, async_task_t* task) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(task);

  // Note: We need to process cancellations even while the loop is being
  // destroyed in case the client is counting on the handler not being
  // invoked again past this point.  Also, the task we're removing here
  // might be present in the dispatcher's |due_list| if it is pending
  // dispatch instead of in the loop's |task_list| as usual.  The same
  // logic works in both cases.

  mtx_lock(&loop->lock);
  list_node_t* node = task_to_node(task);
  if (!list_in_list(node)) {
    mtx_unlock(&loop->lock);
    return ZX_ERR_NOT_FOUND;
  }

  // Determine whether the head task was canceled and following task has
  // a later deadline.  If so, we will bump the timer along to that deadline.
  bool must_restart =
      !loop->dispatching_tasks && node->prev == &loop->task_list &&
      (node->next == &loop->task_list || node_to_task(node->next)->deadline > task->deadline);
  list_delete(node);
  if (must_restart)
    async_loop_restart_timer_locked(loop);

  mtx_unlock(&loop->lock);
  return ZX_OK;
}

static zx_status_t async_loop_queue_packet(async_dispatcher_t* async, async_receiver_t* receiver,
                                           const zx_packet_user_t* data) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(receiver);

  if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;

  zx_port_packet_t packet = {.key = (uintptr_t)receiver, .type = ZX_PKT_TYPE_USER, .status = ZX_OK};
  if (data)
    packet.user = *data;
  return zx_port_queue(loop->port, &packet);
}

static zx_status_t async_loop_set_guest_bell_trap(async_dispatcher_t* async,
                                                  async_guest_bell_trap_t* trap, zx_handle_t guest,
                                                  zx_vaddr_t addr, size_t length) {
  async_loop_t* loop = (async_loop_t*)async;
  ZX_DEBUG_ASSERT(loop);
  ZX_DEBUG_ASSERT(trap);

  if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;

  zx_status_t status =
      zx_guest_set_trap(guest, ZX_GUEST_TRAP_BELL, addr, length, loop->port, (uintptr_t)trap);
  if (status != ZX_OK) {
    ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED || status == ZX_ERR_ALREADY_EXISTS ||
                      status == ZX_ERR_INVALID_ARGS || status == ZX_ERR_OUT_OF_RANGE ||
                      status == ZX_ERR_WRONG_TYPE,
                  "zx_guest_set_trap: status=%d", status);
  }
  return status;
}

static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task) {
  // TODO(ZX-976): We assume that tasks are inserted in quasi-monotonic order and
  // that insertion into the task queue will typically take no more than a few steps.
  // If this assumption proves false and the cost of insertion becomes a problem, we
  // should consider using a more efficient representation for maintaining order.
  list_node_t* node;
  for (node = loop->task_list.prev; node != &loop->task_list; node = node->prev) {
    if (task->deadline >= node_to_task(node)->deadline)
      break;
  }
  list_add_after(node, task_to_node(task));
}

static zx_time_t async_loop_next_deadline_locked(async_loop_t* loop) {
  if (list_is_empty(&loop->due_list)) {
    list_node_t* head = list_peek_head(&loop->task_list);
    if (!head)
      return ZX_TIME_INFINITE;
    async_task_t* task = node_to_task(head);
    if (task->deadline == ZX_TIME_INFINITE)
      return ZX_TIME_INFINITE;
    else
      return task->deadline;
  }
  // Fire now.
  return 0ULL;
}

static void async_loop_restart_timer_locked(async_loop_t* loop) {
  zx_status_t status;
  zx_time_t deadline = async_loop_next_deadline_locked(loop);

  if (deadline == ZX_TIME_INFINITE) {
    // Nothing is left on the queue to fire.
    if (loop->timer_armed) {
      status = zx_timer_cancel(loop->timer);
      ZX_ASSERT_MSG(status == ZX_OK, "zx_timer_cancel: status=%d", status);
      // ZX_ERR_NOT_FOUND can happen here when a pending timer fires and
      // the packet is picked up by port_wait in another thread but has
      // not reached dispatch.
      status = zx_port_cancel(loop->port, loop->timer, KEY_CONTROL);
      ZX_ASSERT_MSG(status == ZX_OK || status == ZX_ERR_NOT_FOUND, "zx_port_cancel: status=%d",
                    status);
      loop->timer_armed = false;
    }

    return;
  }

  status = zx_timer_set(loop->timer, deadline, 0);
  ZX_ASSERT_MSG(status == ZX_OK, "zx_timer_set: status=%d", status);

  if (!loop->timer_armed) {
    loop->timer_armed = true;
    status = zx_object_wait_async(loop->timer, loop->port, KEY_CONTROL, ZX_TIMER_SIGNALED,
                                  ZX_WAIT_ASYNC_ONCE);
    ZX_ASSERT_MSG(status == ZX_OK, "zx_object_wait_async: status=%d", status);
  }
}

static void async_loop_invoke_prologue(async_loop_t* loop) {
  if (loop->config.prologue)
    loop->config.prologue(loop, loop->config.data);
}

static void async_loop_invoke_epilogue(async_loop_t* loop) {
  if (loop->config.epilogue)
    loop->config.epilogue(loop, loop->config.data);
}

static int async_loop_run_thread(void* data) {
  async_loop_t* loop = (async_loop_t*)data;
  if (loop->config.default_accessors.setter) {
    loop->config.default_accessors.setter(&loop->dispatcher);
  }
  async_loop_run(loop, ZX_TIME_INFINITE, false);
  return 0;
}

zx_status_t async_loop_start_thread(async_loop_t* loop, const char* name, thrd_t* out_thread) {
  ZX_DEBUG_ASSERT(loop);

  // This check is inherently racy.  The client should not be racing shutdown
  // with attempts to start new threads.  This is mainly a sanity check.
  async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
  if (state == ASYNC_LOOP_SHUTDOWN)
    return ZX_ERR_BAD_STATE;

  thread_record_t* rec = calloc(1u, sizeof(thread_record_t));
  if (!rec)
    return ZX_ERR_NO_MEMORY;

  if (thrd_create_with_name(&rec->thread, async_loop_run_thread, loop, name) != thrd_success) {
    free(rec);
    return ZX_ERR_NO_MEMORY;
  }

  mtx_lock(&loop->lock);
  list_add_tail(&loop->thread_list, &rec->node);
  mtx_unlock(&loop->lock);

  if (out_thread)
    *out_thread = rec->thread;
  return ZX_OK;
}

void async_loop_join_threads(async_loop_t* loop) {
  ZX_DEBUG_ASSERT(loop);

  mtx_lock(&loop->lock);
  for (;;) {
    thread_record_t* rec = (thread_record_t*)list_remove_head(&loop->thread_list);
    if (!rec)
      break;

    mtx_unlock(&loop->lock);
    thrd_t thread = rec->thread;
    free(rec);
    int result = thrd_join(thread, NULL);
    ZX_DEBUG_ASSERT(result == thrd_success);
    mtx_lock(&loop->lock);
  }
  mtx_unlock(&loop->lock);
}
